package p2p

import (
	"bufio"
	"encoding/json"
	"fmt"
	"osiris/core/consensus"
	"osiris/core/conversion"
	"osiris/dto"
	"osiris/logger"
	"sync"

	"github.com/libp2p/go-libp2p/core/network"
	peer "github.com/libp2p/go-libp2p/core/peer"
	"github.com/libp2p/go-libp2p/core/protocol"
)

// PosProtocol 区块广播
type PosProtocol struct {
	MulticastProtocol
}

func (protocol PosProtocol) protocolName() protocol.ID {
	return "/p2p/multicast/pos"
}

func (protocol PosProtocol) openStream(peerID peer.ID, jsonStr string, wg *sync.WaitGroup) {
	defer func() {
		if wg != nil {
			wg.Done()
		}
	}()

	protocol.openMultiStream(protocol.protocolName(), jsonStr)
}

func (protocol PosProtocol) handleStream(s network.Stream) {
	logger.Println("Got a new stream " + protocol.protocolName())
	logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s]", protocol.protocolName()): "Got a new stream "})

	// Create a buffer stream for non blocking read and write.
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
	wg := new(sync.WaitGroup)
	wg.Add(2)
	verifyCh := make(chan bool)
	relayJsonCh := make(chan string)

	//读协程
	go func() {
		defer wg.Done()

		str, err := rw.ReadString('\n')
		if err != nil {
			verifyCh <- false
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s] read string", protocol.protocolName()): err})
			return
		}

		var blockDto dto.BlockDto
		if err1 := json.Unmarshal([]byte(str), &blockDto); err1 != nil {
			verifyCh <- false
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] BlockDto unmarshal", protocol.protocolName()): err1})
			return
		}

		//检查是否为重复区块
		if consensus.IsHeaderInBuf(blockDto.BlockHeader) {
			verifyCh <- false
			logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] ", protocol.protocolName()): "duplicate BlockDto"})
			return
		}
		//不是重复区块的话，将区块加入缓存
		consensus.AddRecentHeader(blockDto.BlockHeader)

		//从PeerStore拿公钥
		peerID, err2 := conversion.StrToPeerID(blockDto.WriterPeerID)
		if err2 != nil {
			verifyCh <- false
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] decode peer.ID", protocol.protocolName()): err2})
			return
		}
		pubKey := GetPubKey(peerID)

		//验证TxDto的Hash、节点签名
		if !blockDto.Verify(pubKey) {
			verifyCh <- false
			logger.Warn(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] verify txDto", protocol.protocolName()): "fail"})
			return
		}

		//TODO chainID
		err3 := consensus.OnReceiveBlock(0, GetSelfID(), blockDto, pubKey)
		if err3 != nil {
			verifyCh <- false
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] consensus.OnReceiveBlock()", protocol.protocolName()): err3})
			return
		}

		//如果下一个记账节点是自己
		if blockDto.NextPeer == GetSelfID().String() {
			go protocol.reopenStream()
		}

		//全部验证通过，同时将交易加入交易池
		verifyCh <- true
		relayJsonCh <- str
	}()

	//将发送方的peer.ID排除在转发目标外
	fromPeerID := s.Conn().RemotePeer()
	protocol.DynamicExcluded[fromPeerID] = true

	//转发协程
	go func() {
		defer wg.Done()

		valid := <-verifyCh
		if !valid {
			return
		}

		jsonStr := <-relayJsonCh
		protocol.openMultiStream(protocol.protocolName(), jsonStr)
	}()

	wg.Wait()
}

// reopenStream 当自己是下一个记账节点时，准备新的区块广播
//
//	@receiver protocol
func (protocol PosProtocol) reopenStream() {
	blockJsonCh := make(chan string)
	//TODO chainID
	//开始记账
	go consensus.OnCreateBlock(0, GetSelfID(), blockJsonCh)

	//等待blockDto生成
	jsonStr := <-blockJsonCh
	close(blockJsonCh)

	//清空广播黑名单，广播区块
	protocol.DynamicExcluded = make(map[peer.ID]bool, 0)
	protocol.openMultiStream(protocol.protocolName(), jsonStr)
}
